前面介紹完了HDFS的基本操作與指令介紹後,接下來要來簡介MapReduce分散式運算的實作,終於可以寫點程式啦!
由於Hadoop原始碼是由Java所撰寫,當然MapReduce也需要使用Java來實作囉。本篇將會實作MapReduce界中的Hello World
- word count。
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class HadoopWordCountSample {
public static class WordCountMapper
extends Mapper<Object, Text, Text, IntWritable>{
//計數使用,設定為1。每當找到相同的字就會+1。
private final static IntWritable plugOne = new IntWritable(1);
private Text word = new Text();
@Override
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
//使用StringTokenizer效能會比使用split好。預設使用空白、tab或是換行當作分隔符號。
StringTokenizer st = new StringTokenizer(value.toString());
while (st.hasMoreTokens()) {
word.set(st.nextToken());
context.write(word, plugOne);
}
}
}
public static class WordCountReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
@Override
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int reduceSum = 0;
for (IntWritable val : values) {
reduceSum += val.get();
}
result.set(reduceSum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration config = new Configuration();
Job job = Job.getInstance(config, "hadoop word count example");
job.setJarByClass(HadoopWordCountSample.class);
job.setReducerClass(WordCountReducer.class);
job.setMapperClass(WordCountMapper.class);
//設定setCombinerClass後,每個mapper會在sorting後,對結果先做一次reduce
job.setCombinerClass(WordCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//執行程式時,第一個參數(agrs[0])為欲計算檔案路徑
FileInputFormat.addInputPath(job, new Path(args[0]));
//第二個參數(agrs[1])為計算結果存放路徑
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
vi
、vim
等指令將上列程式碼複製貼上後存擋。假設檔案名稱為:HadoopWordCountSample.java
JAVA_HOME
與HADOOP_HOME
等環境變數,接下來只要設定HADOOP_CLASSPATH
即可。#編輯~/.bashrc檔案
sudo vi ~/.bashrc
#加入HADOOP_CLASSPATH。注意!記得放在JAVA_HOME後面得行數,否則會出錯。
export HADOOP_CLASSPATH=${JAVA_HOME}/lib/tools.jar
#Compile
hadoop com.sun.tools.javac.Main HadoopWordCountSample.java
#Package
jar cf hwcs.jar HadoopWordCountSample*.class
首先使用先在本機機器產生兩個檔案:wordcount_target1
與wordcount_target2
,內容分別為:
I am Jack
I am the king of the world
I am Rose
I am looking for Jack
接下來在HDFS上建立一個資料夾,並且將這兩個檔案上傳到資料夾內:
#建立資料夾
hadoop fs -mkdir -p /tmp/wordcount_target
#上傳
hadoop fs -put wordcount_target1 wordcount_target2 /tmp/wordcount_target
hadoop jar hwcs.jar HadoopWordCountSample /tmp/wordcount_target /tmp/wordcount_result
#說明
hwcs.jar: 要執行計算的MapReduce jar檔案路徑(需要包含的檔案名稱)
HadoopWordCountSample: 欲執行的class name
/tmp/wordcount_target: 欲計算檔案路徑
/tmp/wordcount_result: 計算結果存放路徑
如果MapReduce job成功送出到Hadoop上執行後,Yarn會接手資源控管,可以透過web ui:http://{host_or_ip}:8088
觀察MapReduce運作的情形。
註:{host_or_ip}請輸入安裝時namenode所在的host name或是ip。如果是Standalone安裝模式,請輸入localhost。
hadoop fs -cat /tmp/wordcount_result/part-r-00000
#結果
am 4
for 1
king 1
looking 1
of 1
the 2
world 1
I 4
Jack 2
Rose 1
成功執行程式碼後,想必有些人應該是滿臉問號,下一篇就來解釋程式碼的運作流程吧。
不好意思,又來問問題
請問 hwcs.jar
是哪來的 (??
抱歉! 打擾了
原來是自己要打包出來的 jar 檔